;;;; UTIL
(defun make-comparison-expr (field value)
  `(equal (getf lst ,field) ,value))

(defun make-comparisons-list (fields)
  (loop while fields
     collecting (make-comparison-expr (pop fields) (pop fields))))

(defmacro where (&rest clauses)
  `#'(lambda (lst) (and ,@(make-comparisons-list clauses))))

(defun select (lst selector-fn)
  (remove-if-not selector-fn lst))

(defun update (lst selector-fn &rest clauses)
  (let ((field (pop clauses))(value (pop clauses)))
    (setf lst
	  (mapcar
	   #'(lambda (msg)
	       (when (funcall selector-fn msg)
		 (setf (getf msg field) value)
		 msg)) lst))))

(defun max-opid (lst)
  (let ((curr-max '(-1 -1)))
    (loop for opid in lst do
      (let ((lamp (car opid))(actor-id (cadr opid)))
	(if (> lamp (car curr-max))
	    (setf curr-max (list lamp (cadr opid)))
	    (if (and (= lamp (car curr-max))(> actor-id (cadr curr-max)))
		(setf curr-max (list lamp actor-id))))))
    curr-max))

(defun remove-if-succ (schema lst)
  (remove-if-not #'(lambda (msg) (is-visible (getf schema :succ) (getf msg :opid))) lst))

(defun create-succ (schema opid)
  (setf (getf schema :succ) (cons (list :opid opid :succ nil)(getf schema :succ))))

(defun insert-succ (schema opid succ)
  (if (select (getf schema :succ) (where :opid opid))
      (let ((target-succ (getf (car (select (getf schema :succ) (where :opid opid))) :succ)))
	(setf (getf (car (select (getf schema :succ) (where :opid opid))) :succ) (cons succ target-succ)))
      (setf (getf schema :succ) (cons (list :opid opid :succ (list succ)) (getf schema :succ)))))

(defun to-opid-list (lst)
  (loop for msg in lst
	collect (getf msg :opid)))

;;;; CORE
(defun create-schema (&key (id (random 10000)) (messages nil) (contacts nil))
  (list :id id
	:cnt 0
	:messages messages
	:contacts contacts
	:succ nil))

(defun clock-tick (schema)
  (setf (getf schema :cnt) (1+ (getf schema :cnt))))

(defun clock-set (schema cnt)
  (setf (getf schema :cnt) cnt))

(defun create-msg (opid objid prop action value dep)
(list :opid opid
      :objid objid
      :prop prop
      :action action
      :value value
      :dep dep))

(defun is-visible (lst opid)
  (not (select lst (where :opid opid))))

 ;;;Implements last-writer-wins conflict resolution, conflicts is a list of messages
(defun lww (conflicts)
  (max-opid (to-opid-list conflicts)))

(defun search-msg (schema table prop)
  (remove-if-succ schema (select (getf schema :messages) (where :prop prop :objid table))))

;;;; GET
(defun get-msg (schema table prop)
  (getf (car (select (getf schema :messages) (where :opid (lww (search-msg schema table prop))))) :value))

(defun insert-msg (schema msg)
  (let ((opid (getf msg :opid)))
    (loop for contact in (getf schema :contacts)
	  do (push (create-msg-state opid) (getf contact :msg-state)))
    (push msg (getf schema :messages))))

;;;; PUT
(defun put-msg (schema table prop value)
  (let ((pred (to-opid-list (search-msg schema table prop)))
	(new-opid (list (clock-tick schema) (getf schema :id))))
    (let ((msg (create-msg new-opid table prop "set value" value pred)))
      (insert-msg schema msg)
      (loop for p in pred do
	(insert-succ schema p new-opid)))))

;;;; DELETE
(defun delete-msg (schema table prop)
  (let ((pred (to-opid-list (search-msg schema table prop)))
	(new-opid (list (clock-tick schema) (getf schema :id))))
    (loop for p in pred do
      (insert-succ schema p new-opid))))

;;;; PUT-OBJECT
(defun put-obj (schema table prop)
  (let ((pred (to-opid-list (search-msg schema table prop)))
	(objid (list (clock-tick schema) (getf schema :id)))
	(new-opid (list (clock-tick schema) (getf schema :id))))
    (let ((msg (create-msg new-opid table prop "create map" objid pred)))
      (insert-msg schema msg)
      (loop for p in pred do
	(insert-succ schema p new-opid)))
    objid))

;;;; Maintaining per-contact, per-messages, notekeeping

(defun create-msg-state (opid &key (seen nil)(ack nil)(req nil)(send-cnt 0)(send-time 0)(max-latency 10000))
  (list :opid opid
	:seen seen
	:ack ack
	:req req
	:send-cnt send-cnt
	:send-time send-time
	:max-latency max-latency))

(defun msg-state-from-msg (msg)
  (create-msg-state (getf msg :opid)))

(defun add-contact (schema name conid)
  (push (append (list :name name :conid conid)
		(list :msg-state
		      (loop for msg in (getf schema :messages)
			    collect (msg-state-from-msg msg))))
	(getf schema :contacts))
  schema)

(defun get-msg-by-opid (schema opid)
  (car (select (getf schema :messages) (where :opid opid))))

(defun get-msg-state-by-conid (schema conid)
  (let ((contact (car (select (getf schema :contacts) (where :conid conid)))))
    (getf contact :msg-state)))

(defun send-eager-update (schema conid)
  (let ((con-msg-state (get-msg-state-by-conid schema conid)))
    (let ((outgoing-msgs (to-opid-list (select con-msg-state (where :seen nil))))
	  (outgoing-acks (to-opid-list (select con-msg-state (where :ack t)))))
      (let ((msg-records
	      (loop for msg in outgoing-msgs
		    collect (list 1 (get-msg-by-opid schema msg))))
	    (ack-record (list 0 outgoing-acks))
	    (clock-record (list 5 (getf schema :cnt))))
	(loop for ackd in outgoing-acks do
	  (mark-ack schema conid ackd NIL))
	(cons clock-record (cons ack-record msg-records))))))

(defun has-all-deps (deps msgs incoming-msgs)
  (loop for m in deps do
    (if (not (position m (append msgs incoming-msgs) :test #'equal))
	(return-from has-all-deps NIL)))
  T)

(defun mark-seen (schema conid opid)
  (setf (getf (car (select (get-msg-state-by-conid schema conid) (where :opid opid))) :seen) t))

(defun mark-ack (schema conid opid state)
  (setf (getf (car (select (get-msg-state-by-conid schema conid) (where :opid opid))) :ack) state))

(defun add-incoming-msg (schema conid msg incoming-msgs)
  (if (has-all-deps (getf msg :deps) (getf schema :messages) incoming-msgs)
      (mark-seen schema conid (getf msg :opid))))

(defun process-eager-update (schema conid records)
  (let ((incoming-msgs
	  (loop for record in records
		collect (if (eq (car record) 1) (car (cdr record))))))
    (loop for record in records do
      (let ((flag (car record))
	    (body (car (cdr record))))
	(if (equal flag 0)
	    (loop for opid in body do
	      (cond ((get-msg-by-opid schema opid)
		     (mark-ack schema conid opid NIL)
		     (mark-seen schema conid opid)))))
	(if (equal flag 1)
	    (if (get-msg-by-opid schema (getf body :opid))
		(mark-seen schema conid (getf body :opid))
		(cond 
		  ((not (getf body :dep))
		   (insert-msg schema body)
		   (mark-seen schema conid (getf body :opid))
		   (mark-ack schema conid (getf body :opid) t))
		  ((has-all-deps (getf body :dep)
				 (to-opid-list (getf schema :messages))
				 (to-opid-list incoming-msgs))
		   (insert-msg schema body)
		   (mark-seen schema conid (getf body :opid))
		   (mark-ack schema conid (getf body :opid) t)
		   (loop for dep in (getf body :dep) do
		     (insert-succ schema dep (getf body :opid)))))))
	(if (equal flag 2)
	    (princ "process 2"))
	(if (equal flag 3)
	    (princ "process 3"))
	(if (equal flag 4)
	    (princ "process 4"))
	(if (equal flag 5)
	    (clock-set schema (max body (getf schema :cnt))))))))

